
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml" lang="en">
  <head>
    <meta http-equiv="X-UA-Compatible" content="IE=Edge" />
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>tensorflowonspark.dfutil &#8212; TensorFlowOnSpark 1.3.3 documentation</title>
    <link rel="stylesheet" href="../../_static/classic.css" type="text/css" />
    <link rel="stylesheet" href="../../_static/pygments.css" type="text/css" />
    <script type="text/javascript" id="documentation_options" data-url_root="../../" src="../../_static/documentation_options.js"></script>
    <script type="text/javascript" src="../../_static/jquery.js"></script>
    <script type="text/javascript" src="../../_static/underscore.js"></script>
    <script type="text/javascript" src="../../_static/doctools.js"></script>
    <link rel="index" title="Index" href="../../genindex.html" />
    <link rel="search" title="Search" href="../../search.html" /> 
  </head><body>
    <div class="related" role="navigation" aria-label="related navigation">
      <h3>Navigation</h3>
      <ul>
        <li class="right" style="margin-right: 10px">
          <a href="../../genindex.html" title="General Index"
             accesskey="I">index</a></li>
        <li class="right" >
          <a href="../../py-modindex.html" title="Python Module Index"
             >modules</a> |</li>
        <li class="nav-item nav-item-0"><a href="../../index.html">TensorFlowOnSpark 1.3.3 documentation</a> &#187;</li>
          <li class="nav-item nav-item-1"><a href="../index.html" accesskey="U">Module code</a> &#187;</li> 
      </ul>
    </div>  

    <div class="document">
      <div class="documentwrapper">
        <div class="bodywrapper">
          <div class="body" role="main">
            
  <h1>Source code for tensorflowonspark.dfutil</h1><div class="highlight"><pre>
<span></span><span class="c1"># Copyright 2017 Yahoo Inc.</span>
<span class="c1"># Licensed under the terms of the Apache 2.0 license.</span>
<span class="c1"># Please see LICENSE file in the project root for terms.</span>
<span class="sd">&quot;&quot;&quot;A collection of utility functions for loading/saving TensorFlow TFRecords files as Spark DataFrames.&quot;&quot;&quot;</span>

<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">division</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">nested_scopes</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span>

<span class="kn">import</span> <span class="nn">tensorflow</span> <span class="k">as</span> <span class="nn">tf</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="k">import</span> <span class="n">Row</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="k">import</span> <span class="n">ArrayType</span><span class="p">,</span> <span class="n">BinaryType</span><span class="p">,</span> <span class="n">DoubleType</span><span class="p">,</span> <span class="n">LongType</span><span class="p">,</span> <span class="n">StringType</span><span class="p">,</span> <span class="n">StructField</span><span class="p">,</span> <span class="n">StructType</span>

<span class="n">loadedDF</span> <span class="o">=</span> <span class="p">{}</span>       <span class="c1"># Stores origin paths of loaded DataFrames (df =&gt; path)</span>


<div class="viewcode-block" id="isLoadedDF"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.isLoadedDF">[docs]</a><span class="k">def</span> <span class="nf">isLoadedDF</span><span class="p">(</span><span class="n">df</span><span class="p">):</span>
  <span class="sd">&quot;&quot;&quot;Returns True if the input DataFrame was produced by the loadTFRecords() method.</span>

<span class="sd">  This is primarily used by the Spark ML Pipelines APIs.</span>

<span class="sd">  Args:</span>
<span class="sd">    :df: Spark Dataframe</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="k">return</span> <span class="n">df</span> <span class="ow">in</span> <span class="n">loadedDF</span></div>


<div class="viewcode-block" id="saveAsTFRecords"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.saveAsTFRecords">[docs]</a><span class="k">def</span> <span class="nf">saveAsTFRecords</span><span class="p">(</span><span class="n">df</span><span class="p">,</span> <span class="n">output_dir</span><span class="p">):</span>
  <span class="sd">&quot;&quot;&quot;Save a Spark DataFrame as TFRecords.</span>

<span class="sd">  This will convert the DataFrame rows to TFRecords prior to saving.</span>

<span class="sd">  Args:</span>
<span class="sd">    :df: Spark DataFrame</span>
<span class="sd">    :output_dir: Path to save TFRecords</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="n">tf_rdd</span> <span class="o">=</span> <span class="n">df</span><span class="o">.</span><span class="n">rdd</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="n">toTFExample</span><span class="p">(</span><span class="n">df</span><span class="o">.</span><span class="n">dtypes</span><span class="p">))</span>
  <span class="n">tf_rdd</span><span class="o">.</span><span class="n">saveAsNewAPIHadoopFile</span><span class="p">(</span><span class="n">output_dir</span><span class="p">,</span> <span class="s2">&quot;org.tensorflow.hadoop.io.TFRecordFileOutputFormat&quot;</span><span class="p">,</span>
                                <span class="n">keyClass</span><span class="o">=</span><span class="s2">&quot;org.apache.hadoop.io.BytesWritable&quot;</span><span class="p">,</span>
                                <span class="n">valueClass</span><span class="o">=</span><span class="s2">&quot;org.apache.hadoop.io.NullWritable&quot;</span><span class="p">)</span></div>


<div class="viewcode-block" id="loadTFRecords"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.loadTFRecords">[docs]</a><span class="k">def</span> <span class="nf">loadTFRecords</span><span class="p">(</span><span class="n">sc</span><span class="p">,</span> <span class="n">input_dir</span><span class="p">,</span> <span class="n">binary_features</span><span class="o">=</span><span class="p">[]):</span>
  <span class="sd">&quot;&quot;&quot;Load TFRecords from disk into a Spark DataFrame.</span>

<span class="sd">  This will attempt to automatically convert the tf.train.Example features into Spark DataFrame columns of equivalent types.</span>

<span class="sd">  Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to</span>
<span class="sd">  disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a &quot;hint&quot;</span>
<span class="sd">  from the caller in the ``binary_features`` argument.</span>

<span class="sd">  Args:</span>
<span class="sd">    :sc: SparkContext</span>
<span class="sd">    :input_dir: location of TFRecords on disk.</span>
<span class="sd">    :binary_features: a list of tf.train.Example features which are expected to be binary/bytearrays.</span>

<span class="sd">  Returns:</span>
<span class="sd">    A Spark DataFrame mirroring the tf.train.Example schema.</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="kn">import</span> <span class="nn">tensorflow</span> <span class="k">as</span> <span class="nn">tf</span>

  <span class="n">tfr_rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">newAPIHadoopFile</span><span class="p">(</span><span class="n">input_dir</span><span class="p">,</span> <span class="s2">&quot;org.tensorflow.hadoop.io.TFRecordFileInputFormat&quot;</span><span class="p">,</span>
                                <span class="n">keyClass</span><span class="o">=</span><span class="s2">&quot;org.apache.hadoop.io.BytesWritable&quot;</span><span class="p">,</span>
                                <span class="n">valueClass</span><span class="o">=</span><span class="s2">&quot;org.apache.hadoop.io.NullWritable&quot;</span><span class="p">)</span>

  <span class="c1"># infer Spark SQL types from tf.Example</span>
  <span class="n">record</span> <span class="o">=</span> <span class="n">tfr_rdd</span><span class="o">.</span><span class="n">take</span><span class="p">(</span><span class="mi">1</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span>
  <span class="n">example</span> <span class="o">=</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Example</span><span class="p">()</span>
  <span class="n">example</span><span class="o">.</span><span class="n">ParseFromString</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">record</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span>
  <span class="n">schema</span> <span class="o">=</span> <span class="n">infer_schema</span><span class="p">(</span><span class="n">example</span><span class="p">,</span> <span class="n">binary_features</span><span class="p">)</span>

  <span class="c1"># convert serialized protobuf to tf.Example to Row</span>
  <span class="n">example_rdd</span> <span class="o">=</span> <span class="n">tfr_rdd</span><span class="o">.</span><span class="n">mapPartitions</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">fromTFExample</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="n">binary_features</span><span class="p">))</span>

  <span class="c1"># create a Spark DataFrame from RDD[Row]</span>
  <span class="n">df</span> <span class="o">=</span> <span class="n">example_rdd</span><span class="o">.</span><span class="n">toDF</span><span class="p">(</span><span class="n">schema</span><span class="p">)</span>

  <span class="c1"># save reference of this dataframe</span>
  <span class="n">loadedDF</span><span class="p">[</span><span class="n">df</span><span class="p">]</span> <span class="o">=</span> <span class="n">input_dir</span>
  <span class="k">return</span> <span class="n">df</span></div>


<div class="viewcode-block" id="toTFExample"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.toTFExample">[docs]</a><span class="k">def</span> <span class="nf">toTFExample</span><span class="p">(</span><span class="n">dtypes</span><span class="p">):</span>
  <span class="sd">&quot;&quot;&quot;mapPartition function to convert a Spark RDD of Row into an RDD of serialized tf.train.Example bytestring.</span>

<span class="sd">  Note that tf.train.Example is a fairly flat structure with limited datatypes, e.g. tf.train.FloatList,</span>
<span class="sd">  tf.train.Int64List, and tf.train.BytesList, so most DataFrame types will be coerced into one of these types.</span>

<span class="sd">  Args:</span>
<span class="sd">    :dtypes: the DataFrame.dtypes of the source DataFrame.</span>

<span class="sd">  Returns:</span>
<span class="sd">    A mapPartition function which converts the source DataFrame into tf.train.Example bytestrings.</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="k">def</span> <span class="nf">_toTFExample</span><span class="p">(</span><span class="nb">iter</span><span class="p">):</span>

    <span class="c1"># supported type mappings between DataFrame.dtypes and tf.train.Feature types</span>
    <span class="n">float_dtypes</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;float&#39;</span><span class="p">,</span> <span class="s1">&#39;double&#39;</span><span class="p">]</span>
    <span class="n">int64_dtypes</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;boolean&#39;</span><span class="p">,</span> <span class="s1">&#39;tinyint&#39;</span><span class="p">,</span> <span class="s1">&#39;smallint&#39;</span><span class="p">,</span> <span class="s1">&#39;int&#39;</span><span class="p">,</span> <span class="s1">&#39;bigint&#39;</span><span class="p">,</span> <span class="s1">&#39;long&#39;</span><span class="p">]</span>
    <span class="n">bytes_dtypes</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;binary&#39;</span><span class="p">,</span> <span class="s1">&#39;string&#39;</span><span class="p">]</span>
    <span class="n">float_list_dtypes</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;array&lt;float&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;double&gt;&#39;</span><span class="p">]</span>
    <span class="n">int64_list_dtypes</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;array&lt;boolean&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;tinyint&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;smallint&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;int&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;bigint&gt;&#39;</span><span class="p">,</span> <span class="s1">&#39;array&lt;long&gt;&#39;</span><span class="p">]</span>

    <span class="k">def</span> <span class="nf">_toTFFeature</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">dtype</span><span class="p">,</span> <span class="n">row</span><span class="p">):</span>
      <span class="n">feature</span> <span class="o">=</span> <span class="kc">None</span>
      <span class="k">if</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">float_dtypes</span><span class="p">:</span>
        <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">float_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">FloatList</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="p">[</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">]])))</span>
      <span class="k">elif</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">int64_dtypes</span><span class="p">:</span>
        <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">int64_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Int64List</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="p">[</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">]])))</span>
      <span class="k">elif</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">bytes_dtypes</span><span class="p">:</span>
        <span class="k">if</span> <span class="n">dtype</span> <span class="o">==</span> <span class="s1">&#39;binary&#39;</span><span class="p">:</span>
          <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">bytes_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">BytesList</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="p">[</span><span class="nb">bytes</span><span class="p">(</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">])])))</span>
        <span class="k">else</span><span class="p">:</span>
          <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">bytes_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">BytesList</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="p">[</span><span class="nb">str</span><span class="p">(</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">])</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)])))</span>
      <span class="k">elif</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">float_list_dtypes</span><span class="p">:</span>
        <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">float_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">FloatList</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">])))</span>
      <span class="k">elif</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">int64_list_dtypes</span><span class="p">:</span>
        <span class="n">feature</span> <span class="o">=</span> <span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Feature</span><span class="p">(</span><span class="n">int64_list</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Int64List</span><span class="p">(</span><span class="n">value</span><span class="o">=</span><span class="n">row</span><span class="p">[</span><span class="n">name</span><span class="p">])))</span>
      <span class="k">else</span><span class="p">:</span>
        <span class="k">raise</span> <span class="ne">Exception</span><span class="p">(</span><span class="s2">&quot;Unsupported dtype: </span><span class="si">{0}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dtype</span><span class="p">))</span>
      <span class="k">return</span> <span class="n">feature</span>

    <span class="n">results</span> <span class="o">=</span> <span class="p">[]</span>
    <span class="k">for</span> <span class="n">row</span> <span class="ow">in</span> <span class="nb">iter</span><span class="p">:</span>
      <span class="n">features</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">([</span><span class="n">_toTFFeature</span><span class="p">(</span><span class="n">name</span><span class="p">,</span> <span class="n">dtype</span><span class="p">,</span> <span class="n">row</span><span class="p">)</span> <span class="k">for</span> <span class="n">name</span><span class="p">,</span> <span class="n">dtype</span> <span class="ow">in</span> <span class="n">dtypes</span><span class="p">])</span>
      <span class="n">example</span> <span class="o">=</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Example</span><span class="p">(</span><span class="n">features</span><span class="o">=</span><span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Features</span><span class="p">(</span><span class="n">feature</span><span class="o">=</span><span class="n">features</span><span class="p">))</span>
      <span class="n">results</span><span class="o">.</span><span class="n">append</span><span class="p">((</span><span class="nb">bytearray</span><span class="p">(</span><span class="n">example</span><span class="o">.</span><span class="n">SerializeToString</span><span class="p">()),</span> <span class="kc">None</span><span class="p">))</span>
    <span class="k">return</span> <span class="n">results</span>

  <span class="k">return</span> <span class="n">_toTFExample</span></div>


<div class="viewcode-block" id="infer_schema"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.infer_schema">[docs]</a><span class="k">def</span> <span class="nf">infer_schema</span><span class="p">(</span><span class="n">example</span><span class="p">,</span> <span class="n">binary_features</span><span class="o">=</span><span class="p">[]):</span>
  <span class="sd">&quot;&quot;&quot;Given a tf.train.Example, infer the Spark DataFrame schema (StructFields).</span>

<span class="sd">  Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to</span>
<span class="sd">  disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a &quot;hint&quot;</span>
<span class="sd">  from the caller in the ``binary_features`` argument.</span>

<span class="sd">  Args:</span>
<span class="sd">    :example: a tf.train.Example</span>
<span class="sd">    :binary_features: a list of tf.train.Example features which are expected to be binary/bytearrays.</span>

<span class="sd">  Returns:</span>
<span class="sd">    A DataFrame StructType schema</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="k">def</span> <span class="nf">_infer_sql_type</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">):</span>
    <span class="c1"># special handling for binary features</span>
    <span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">binary_features</span><span class="p">:</span>
      <span class="k">return</span> <span class="n">BinaryType</span><span class="p">()</span>

    <span class="k">if</span> <span class="n">v</span><span class="o">.</span><span class="n">int64_list</span><span class="o">.</span><span class="n">value</span><span class="p">:</span>
      <span class="n">result</span> <span class="o">=</span> <span class="n">v</span><span class="o">.</span><span class="n">int64_list</span><span class="o">.</span><span class="n">value</span>
      <span class="n">sql_type</span> <span class="o">=</span> <span class="n">LongType</span><span class="p">()</span>
    <span class="k">elif</span> <span class="n">v</span><span class="o">.</span><span class="n">float_list</span><span class="o">.</span><span class="n">value</span><span class="p">:</span>
      <span class="n">result</span> <span class="o">=</span> <span class="n">v</span><span class="o">.</span><span class="n">float_list</span><span class="o">.</span><span class="n">value</span>
      <span class="n">sql_type</span> <span class="o">=</span> <span class="n">DoubleType</span><span class="p">()</span>
    <span class="k">else</span><span class="p">:</span>
      <span class="n">result</span> <span class="o">=</span> <span class="n">v</span><span class="o">.</span><span class="n">bytes_list</span><span class="o">.</span><span class="n">value</span>
      <span class="n">sql_type</span> <span class="o">=</span> <span class="n">StringType</span><span class="p">()</span>

    <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>             <span class="c1"># represent multi-item tensors as Spark SQL ArrayType() of base types</span>
      <span class="k">return</span> <span class="n">ArrayType</span><span class="p">(</span><span class="n">sql_type</span><span class="p">)</span>
    <span class="k">else</span><span class="p">:</span>                           <span class="c1"># represent everything else as base types (and empty tensors as StringType())</span>
      <span class="k">return</span> <span class="n">sql_type</span>

  <span class="k">return</span> <span class="n">StructType</span><span class="p">([</span><span class="n">StructField</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">_infer_sql_type</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">),</span> <span class="kc">True</span><span class="p">)</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">example</span><span class="o">.</span><span class="n">features</span><span class="o">.</span><span class="n">feature</span><span class="o">.</span><span class="n">items</span><span class="p">())])</span></div>


<div class="viewcode-block" id="fromTFExample"><a class="viewcode-back" href="../../tensorflowonspark.dfutil.html#tensorflowonspark.dfutil.fromTFExample">[docs]</a><span class="k">def</span> <span class="nf">fromTFExample</span><span class="p">(</span><span class="nb">iter</span><span class="p">,</span> <span class="n">binary_features</span><span class="o">=</span><span class="p">[]):</span>
  <span class="sd">&quot;&quot;&quot;mapPartition function to convert an RDD of serialized tf.train.Example bytestring into an RDD of Row.</span>

<span class="sd">  Note: TensorFlow represents both strings and binary types as tf.train.BytesList, and we need to</span>
<span class="sd">  disambiguate these types for Spark DataFrames DTypes (StringType and BinaryType), so we require a &quot;hint&quot;</span>
<span class="sd">  from the caller in the ``binary_features`` argument.</span>

<span class="sd">  Args:</span>
<span class="sd">    :iter: the RDD partition iterator</span>
<span class="sd">    :binary_features: a list of tf.train.Example features which are expected to be binary/bytearrays.</span>

<span class="sd">  Returns:</span>
<span class="sd">    An array/iterator of DataFrame Row with features converted into columns.</span>
<span class="sd">  &quot;&quot;&quot;</span>
  <span class="c1"># convert from protobuf-like dict to DataFrame-friendly dict</span>
  <span class="k">def</span> <span class="nf">_get_value</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">):</span>
    <span class="k">if</span> <span class="n">v</span><span class="o">.</span><span class="n">int64_list</span><span class="o">.</span><span class="n">value</span><span class="p">:</span>
      <span class="n">result</span> <span class="o">=</span> <span class="n">v</span><span class="o">.</span><span class="n">int64_list</span><span class="o">.</span><span class="n">value</span>
    <span class="k">elif</span> <span class="n">v</span><span class="o">.</span><span class="n">float_list</span><span class="o">.</span><span class="n">value</span><span class="p">:</span>
      <span class="n">result</span> <span class="o">=</span> <span class="n">v</span><span class="o">.</span><span class="n">float_list</span><span class="o">.</span><span class="n">value</span>
    <span class="k">else</span><span class="p">:</span>  <span class="c1"># string or bytearray</span>
      <span class="k">if</span> <span class="n">k</span> <span class="ow">in</span> <span class="n">binary_features</span><span class="p">:</span>
        <span class="k">return</span> <span class="nb">bytearray</span><span class="p">(</span><span class="n">v</span><span class="o">.</span><span class="n">bytes_list</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
      <span class="k">else</span><span class="p">:</span>
        <span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="n">bytes_list</span><span class="o">.</span><span class="n">value</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">)</span>

    <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>         <span class="c1"># represent multi-item tensors as python lists</span>
      <span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="n">result</span><span class="p">)</span>
    <span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">result</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>      <span class="c1"># extract scalars from single-item tensors</span>
      <span class="k">return</span> <span class="n">result</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span>
    <span class="k">else</span><span class="p">:</span>                       <span class="c1"># represent empty tensors as python None</span>
      <span class="k">return</span> <span class="kc">None</span>

  <span class="n">results</span> <span class="o">=</span> <span class="p">[]</span>
  <span class="k">for</span> <span class="n">record</span> <span class="ow">in</span> <span class="nb">iter</span><span class="p">:</span>
    <span class="n">example</span> <span class="o">=</span> <span class="n">tf</span><span class="o">.</span><span class="n">train</span><span class="o">.</span><span class="n">Example</span><span class="p">()</span>
    <span class="n">example</span><span class="o">.</span><span class="n">ParseFromString</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">record</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span>       <span class="c1"># record is (bytestr, None)</span>
    <span class="n">d</span> <span class="o">=</span> <span class="p">{</span><span class="n">k</span><span class="p">:</span> <span class="n">_get_value</span><span class="p">(</span><span class="n">k</span><span class="p">,</span> <span class="n">v</span><span class="p">)</span> <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">sorted</span><span class="p">(</span><span class="n">example</span><span class="o">.</span><span class="n">features</span><span class="o">.</span><span class="n">feature</span><span class="o">.</span><span class="n">items</span><span class="p">())}</span>
    <span class="n">row</span> <span class="o">=</span> <span class="n">Row</span><span class="p">(</span><span class="o">**</span><span class="n">d</span><span class="p">)</span>
    <span class="n">results</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">row</span><span class="p">)</span>

  <span class="k">return</span> <span class="n">results</span></div>
</pre></div>

          </div>
        </div>
      </div>
      <div class="sphinxsidebar" role="navigation" aria-label="main navigation">
        <div class="sphinxsidebarwrapper">
<div id="searchbox" style="display: none" role="search">
  <h3>Quick search</h3>
    <div class="searchformwrapper">
    <form class="search" action="../../search.html" method="get">
      <input type="text" name="q" />
      <input type="submit" value="Go" />
      <input type="hidden" name="check_keywords" value="yes" />
      <input type="hidden" name="area" value="default" />
    </form>
    </div>
</div>
<script type="text/javascript">$('#searchbox').show(0);</script>
        </div>
      </div>
      <div class="clearer"></div>
    </div>
    <div class="related" role="navigation" aria-label="related navigation">
      <h3>Navigation</h3>
      <ul>
        <li class="right" style="margin-right: 10px">
          <a href="../../genindex.html" title="General Index"
             >index</a></li>
        <li class="right" >
          <a href="../../py-modindex.html" title="Python Module Index"
             >modules</a> |</li>
        <li class="nav-item nav-item-0"><a href="../../index.html">TensorFlowOnSpark 1.3.3 documentation</a> &#187;</li>
          <li class="nav-item nav-item-1"><a href="../index.html" >Module code</a> &#187;</li> 
      </ul>
    </div>
    <div class="footer" role="contentinfo">
        &#169; Copyright 2018, Yahoo Inc.
      Created using <a href="http://sphinx-doc.org/">Sphinx</a> 1.7.9.
    </div>
  </body>
</html>